1   /**
2    * Copyright 2014 Netflix, Inc.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5    * use this file except in compliance with the License. You may obtain a copy of
6    * the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package rx.internal.operators;
17  
18  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19  import rx.Observable;
20  import rx.Scheduler;
21  import rx.Subscriber;
22  import rx.functions.Action0;
23  import rx.functions.Func2;
24  import rx.schedulers.Schedulers;
25  import rx.subscriptions.SerialSubscription;
26  
27  public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
28      final Func2<Integer, Throwable, Boolean> predicate;
29      public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
30          this.predicate = predicate;
31      }
32  
33      @Override
34      public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
35          final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
36          child.add(inner);
37          
38          final SerialSubscription serialSubscription = new SerialSubscription();
39          // add serialSubscription so it gets unsubscribed if child is unsubscribed
40          child.add(serialSubscription);
41          
42          return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
43      }
44      
45      static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
46          final Subscriber<? super T> child;
47          final Func2<Integer, Throwable, Boolean> predicate;
48          final Scheduler.Worker inner;
49          final SerialSubscription serialSubscription;
50          
51          volatile int attempts;
52          @SuppressWarnings("rawtypes")
53          static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
54                  = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
55  
56          public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner, 
57                  SerialSubscription serialSubscription) {
58              this.child = child;
59              this.predicate = predicate;
60              this.inner = inner;
61              this.serialSubscription = serialSubscription;
62          }
63          
64          
65          @Override
66              public void onCompleted() {
67                  // ignore as we expect a single nested Observable<T>
68              }
69  
70              @Override
71              public void onError(Throwable e) {
72                  child.onError(e);
73              }
74  
75              @Override
76              public void onNext(final Observable<T> o) {
77                  inner.schedule(new Action0() {
78  
79                      @Override
80                      public void call() {
81                          final Action0 _self = this;
82                          ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
83  
84                          // new subscription each time so if it unsubscribes itself it does not prevent retries
85                          // by unsubscribing the child subscription
86                          Subscriber<T> subscriber = new Subscriber<T>() {
87                              boolean done;
88                              @Override
89                              public void onCompleted() {
90                                  if (!done) {
91                                      done = true;
92                                      child.onCompleted();
93                                  }
94                              }
95  
96                              @Override
97                              public void onError(Throwable e) {
98                                  if (!done) {
99                                      done = true;
100                                     if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
101                                         // retry again
102                                         inner.schedule(_self);
103                                     } else {
104                                         // give up and pass the failure
105                                         child.onError(e);
106                                     }
107                                 }
108                             }
109 
110                             @Override
111                             public void onNext(T v) {
112                                 if (!done) {
113                                     child.onNext(v);
114                                 }
115                             }
116 
117                         };
118                         // register this Subscription (and unsubscribe previous if exists) 
119                         serialSubscription.set(subscriber);
120                         o.unsafeSubscribe(subscriber);
121                     }
122                 });
123             }
124     }
125 }